package gu.simplemq.pool;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.net.URI;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;

import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;

import gu.simplemq.Constant;
import gu.simplemq.MQProperties;
import gu.simplemq.MQPropertiesHelper;
import gu.simplemq.pool.BaseMQPool.MQPoolException;
import gu.simplemq.utils.LifoShutdownHooks;

import static com.google.common.base.Preconditions.checkNotNull;

/**
 * 基于名字管理的连接池管理器抽象类<br>
 * 默认连接池实例名为 {@value #DEFAULT_NAME}<br>
 * 使用方法:<br>
 * 通过 {@link #getDefaultPool()} 和{@link #getPool(String)}系列方法获取连接池实例<br>
 * 
 * @param <P> 连接池类型
 * @author guyadong
 * @since 2.4.0
 *
 */
@SuppressWarnings("rawtypes")
public class NamedMQPools<P extends BaseMQPool> implements Constant {
	/** 默认连接池实例名 */
	public static final String DEFAULT_NAME = "DEFAULT";
	/** 名字-连接池实例映射 */
	protected final ConcurrentMap<String, P> POOLS = Maps.newConcurrentMap();
	/** 连接池类型 */
	protected final Class<?> poolType;
	protected final MQPropertiesHelper propertiesHelper;

	protected NamedMQPools(MQPropertiesHelper propertiesHelper) {
		this.propertiesHelper = checkNotNull(propertiesHelper, "propertiesHelper is null");
		Type superClass = getClass().getGenericSuperclass();
		this.poolType = getRawClass(((ParameterizedType) superClass).getActualTypeArguments()[0]);
	}
	{
		// JVM 结束时自动清除所有consumer和subscriber,producer对象
		LifoShutdownHooks.addShutdownHook(this.toString(), new Runnable() {

			@Override
			public void run() {
				closeAll();
			}
		});
	}
	/**
	 * 根据连接参数创建实例
	 * 
	 * @param props 连接参数
	 * @return 连接池实例
	 */
	@SuppressWarnings("unchecked")
	public P createPool(Properties props) {
		try {
			return (P) poolType.getConstructor(Properties.class).newInstance(props);
		} catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException
				| NoSuchMethodException | SecurityException e) {
			Throwables.throwIfUnchecked(e);
			throw new RuntimeException(e);
		}
	}

	/**
	 * 定义一个连接池实例
	 * 
	 * @param name 连接池实例名
	 * @param pool 连接池实例
	 * @param throwIfExists 如果已经存在name对应的实例,是否抛出异常
	 * @throws NamedPoolException 如果已经存在name对应的实例,则抛出异常
	 */
	protected final void define(String name, P pool, boolean throwIfExists) throws NamedPoolException {
		// 尝试将连接池实例加入到名字-连接池实例映射中，如果已经存在name对应的实例,则抛出异常
		P prev = POOLS.putIfAbsent(name, checkNotNull(pool, "pool is null"));
		if (prev!= null) {
			if (throwIfExists) {
				throw new NamedPoolException("the pool named " + name + " already exists");
			}
		}
	}

	/**
	 * 根据指定的连接参数创建实例,如果已经存在name对应的实例,则抛出异常
	 * 
	 * @param name
	 * @param props
	 * @param throwIfExists 如果已经存在name对应的实例,是否抛出异常
	 * @throws IllegalStateException 如果已经存在name对应的实例,则抛出异常
	 */
	public final synchronized void define(String name, Properties props, boolean throwIfExists) throws IllegalStateException {
		P pool = createPool(props);
		try {
			define(name, pool, throwIfExists);
		} catch (NamedPoolException e) {
			pool.close();
		}
	}
	/**
	 * 定义默认连接池实例
	 * 
	 * @param props 连接参数
	 * @return 连接池实例
	 */
	public final P defineDefaultPool(Properties props) {
		P pool = createPool(props);
		define(DEFAULT_NAME, pool, true);
		return pool;
	}
	/**
	 * 返回名字-连接池实例映射中是否包含指定的名字
	 * 
	 * @param name
	 */
	public final boolean exists(String name) {
		return POOLS.containsKey(name);
	}
	/**
	 * 返回所有已经定义的连接池实例的名字
	 */
	public final Set<String> names() {
		return POOLS.keySet();
	}
	/**
	 * 获取默认连接池实例，如果不存在则抛出异常
	 */
	public final P getDefaultPool() {
		return getPool(DEFAULT_NAME);
	}
	
	/**
	 * 获取指定名字的连接池实例，如果不存在则抛出异常
	 * 
	 * @param name
	 */
	public final P getPool(String name) {
		P pool = checkNotNull(POOLS.get(name),"INVALID pool name %s",name);
		if(pool.isClosed()) {
			POOLS.remove(name);
			throw new NamedPoolException("CLOSED POOL: " + name + ", be removed, redefine it please");
		}
		return pool;
	}

	public Collection<P> getPools() {
		return POOLS.values();
	}
	/**
	 * 查找在连接池对象集合中查找对应的匹配的对象,找不到就创建新实例
	 * 
	 * @param props
	 * @param createIfAbsent 如果不存在,是否创建新实例
	 * @return 连接池实例
	 */
	public final synchronized P getPool(Properties props,boolean createIfAbsent) {
		final URI location = propertiesHelper.getLocationlURI(props);
		P found = Iterables.find(POOLS.values(), new Predicate<P>() {
			@Override
			public boolean apply(P p) {
				return location.equals(p.getCanonicalURI());
			}
		}, null);
		if(found == null && createIfAbsent) {
			return createPool(props);
		}
		return found;
	}
	/**	 
	 * 创建连接池实例，执行 {@link BaseMQPool#apply()}和 {@link BaseMQPool#free()}操作，测试连接是否有效  
	 * @param props 连接参数
	 * @throws MQPoolException 无法连接
	 */
	protected void checkConnect0(MQProperties props) throws MQPoolException {
	    try (P pool = createPool(props)){
	    	pool.apply();
			pool.free();
		} 
	}
	/**
	 * 测试接参数是否有效,无效抛出异常
	 * @param input
	 * @param timeoutMills 超时参数(毫秒),为{@code null}使用默认值
	 * @throws MQPoolException 无法连接
	 */
	public void checkConnect(Map input, Long timeoutMills) throws MQPoolException {
		MQProperties props = propertiesHelper.initParameters(input);
		checkConnect0(props);
	}
	/**
	 * 测试接参数是否有效
	 * @param props
	 * @param timeoutMills 超时参数(毫秒),为{@code null}使用默认值
	 * @return 连接失败返回{@code false}
	 */
	public boolean testConnect(Map props, Long timeoutMills) {
		try {
			checkConnect(props, timeoutMills);
			return true;
		} catch (MQPoolException e) {
			return false;
		}
	}
	/**
	 * 关闭并删除所有资源池中的连接池实例
	 */
	public final synchronized void closeAll() {
		for (Iterator<Entry<String, P>> itor = POOLS.entrySet().iterator(); itor.hasNext();) {
			Entry<String, P> entry = itor.next();
			itor.remove();
			logger.debug("CLOSE POOL {}:{}({})",entry.getKey(),entry.getValue().getClass().getName(),entry.getValue().getCanonicalURI());
			entry.getValue().close();
		}
	}
	
	private static Class<?> getRawClass(Type type) {
		if (type instanceof Class<?>) {
			return (Class<?>) type;
		} else if (type instanceof ParameterizedType) {
			return getRawClass(((ParameterizedType) type).getRawType());
		} else {
			throw new IllegalArgumentException("invalid type");
		}
	}
	@SuppressWarnings("serial")
	public static class NamedPoolException extends RuntimeException{
	
		public NamedPoolException(String message) {
			super(message);
		}

		public NamedPoolException(Throwable cause) {
			super(cause);
		}
	}
}
